perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition values#3247
Conversation
hsiang-c
left a comment
There was a problem hiding this comment.
Replace JSON with Protocol Buffer LGTM
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3247 +/- ##
============================================
+ Coverage 56.12% 60.07% +3.94%
- Complexity 976 1438 +462
============================================
Files 119 172 +53
Lines 11743 15927 +4184
Branches 2251 2631 +380
============================================
+ Hits 6591 9568 +2977
- Misses 4012 5031 +1019
- Partials 1140 1328 +188 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| /// This replaces JSON parsing with direct protobuf deserialization with a more compact | ||
| /// representation (e.g., timestamps as integers vs strings). |
There was a problem hiding this comment.
nit: we can remove the part of the comment that explains how this used to work
| checkIcebergNativeScan( | ||
| "SELECT COUNT(*) FROM s3_catalog.db.large_partitioned_test WHERE partition_id IN (0, 50, 99)") | ||
|
|
||
| spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test") |
There was a problem hiding this comment.
(nit) You can try DROP TABLE s3_catalog.db.large_partitioned_test PURGE to remove files on disk.
| oneof value { | ||
| int32 int_val = 2; | ||
| int64 long_val = 3; | ||
| int64 date_val = 4; // days since epoch | ||
| int64 timestamp_val = 5; // microseconds since epoch | ||
| int64 timestamp_tz_val = 6; // microseconds with timezone | ||
| string string_val = 7; | ||
| double double_val = 8; | ||
| float float_val = 9; | ||
| bytes decimal_val = 10; // unscaled BigInteger bytes | ||
| bool bool_val = 11; | ||
| bytes uuid_val = 12; | ||
| bytes fixed_val = 13; | ||
| bytes binary_val = 14; |
There was a problem hiding this comment.
We may want to consider consolidating this with the existing Literal defined in protobuf. This does not need to happen for the current PR.
message Literal {
oneof value {
bool bool_val = 1;
// Protobuf doesn't provide int8 and int16, we put them into int32 and convert
// to int8 and int16 when deserializing.
int32 byte_val = 2;
int32 short_val = 3;
int32 int_val = 4;
int64 long_val = 5;
float float_val = 6;
double double_val = 7;
string string_val = 8;
bytes bytes_val = 9;
bytes decimal_val = 10;
ListLiteral list_val = 11;
}There was a problem hiding this comment.
These are Iceberg types though.
| /** | ||
| * Legacy JSON serialization function - removed in favor of protobuf. Kept as reference for | ||
| * conversion logic. | ||
| */ | ||
| private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue = { |
There was a problem hiding this comment.
Found some unused code as a result of removing this. Thanks!
|
@parthchandra there is a clippy failure |
8319470 to
e8b87e7
Compare
|
@parthchandra do you have benchmarks showing the performance improvement? |
|
Merged. Thanks @andygrove @hsiang-c |
(Sorry, didn't notice this before merging). |
Didn't seem to have too much of an impact. |
Rationale for this change
We see increased GC collection times in jobs with Iceberg scans with a large number (10K-100K) of partitions
What changes are included in this PR?
Partition values are currently serialized to native by constructing a JSON string. This PR changes that to use Protobuf instead.
How are these changes tested?
Added a new unit test for a large number of partitions.
AI note: large parts were generated using Claude Code.